Para quem ainda não conhece Streams, é uma API incorporada no Java a partir da versão 8. Ela nos permite processar um fluxo de dados de forma declarativa, onde temos os elementos de entrada, o processamento dos elementos e a preparação para saída. Pense como uma linha de montagem de uma fábrica, onde os produtos/matéria-prima são recebidos, são tratados e então preparados para uso. Para maior proveito do artigo sugiro que conheça o funcionamento da API, pois iremos aprofundar um pouco mais no assunto daqui para frente.
Quando trabalhamos com uma Stream serial com muitos elementos, problemas de performance começam a aparecer durante o seu processamento, logo, pensamos que a solução seria processar a Stream paralelamente, sendo necessário criar threads e dividir o processamento em várias tarefas, realmente seria uma boa ideia, porém não é nenhum um pouco simples controlarmos todo esse processamento da melhor forma possível. Pensando em produtividade e conformidade a API Stream veio pronta para processamento paralelo, basta usarmos e seguirmos suas regras para alcançarmos o resultado desejado.
Uma das características mais relevantes da API de Streams é o que vamos abordar neste artigo, tratamento de concorrência nativa, que é capaz de processar os resultados concorrentemente usando múltiplas threads. Por padrão, o número de threads disponíveis em Stream paralelas é o mesmo que o número de CPUs do seu ambiente (que pode ser obtido por meio do método Runtime.getRuntime().availableProcessors() ).
Criando Streams Paralelas
Streams paralelas podem ser criadas a partir de dois métodos:
Stream<String> stream = Arrays.asList("casa", "janela", "porta").stream();
Stream<String> streamParela = stream.parallel();
Stream<String> streamParelela = Arrays.asList("casa", "janela", "porta").parallelStream();
Processando Tarefas em Paralelo
Em uma Stream serial podemos garantir a ordem que os elementos serão processados, porém com Streams paralelas não podemos garantir o mesmo. Existe uma versão do método forEach() chamada forEachOrdered(), que força uma Stream paralela a processar os resultados em ordem.
Arrays.asList(1, 2, 3)
.parallelStream()
.forEach(System.out::println);
Arrays.asList(1, 2, 3)
.parallelStream()
.forEachOrdered(System.out::println);
As Streams declaradas nas duas listagens acima possuem o mesmo conteúdo e são paralelas. Na Listagem 3 não conseguimos determinar a ordem que os elementos serão impressos, pois estamos trabalhando com processamento paralelo. Já na Listagem 4, o método forEachOrdered() garante que a Stream paralela será processa serialmente sendo possível garantir a ordem que os elementos serão impressos.
Efeitos colaterais podem aparecer em Streams paralelas se suas expressões lambdas forem stateful. Expressão lambda stateful é aquela que o resultado depende de um estado que pode mudar durante a execução do pipeline.
List<Integer> dados = Collections.synchronizedList(new ArrayList<>());
Arrays.asList(1, 2, 3)
.parallelStream()
.map(i -> {dados.add(i); return i;})
.forEachOrdered(i -> System.out.print(i + " "));
System.out.println();
for (Integer i : dados) {
// ordem indeterminada
System.out.print(i + " ");
}
Para exemplificar, na Listagem 5 foi usada uma expressão lambda stateful no método map de uma Stream paralela. Consideramos está expressão como stateful, pois ela adiciona um elemento na coleção que pode mudar durante a execução paralela, haja vista que mais de um elemento pode ser adicionado por vez, mudando assim seu estado.
Olhando superficialmente pensamos que o código acima imprime duas linhas iguais, porém não é bem assim. Não podemos garantir a ordem que o método map() executa cada elemento da Stream, pois estamos falando de Stream paralelas. Portanto, podemos garantir que o método forEachOrdered irá imprimir na ordem desejada, ou seja, “1 2 3”.
Desde que a ordem não é garantida em Streams paralelas, métodos como findAny() em Streams paralelas podem resultar em comportamentos inesperados, ou seja, não são predicáveis. Vejamos o exemplo abaixo:
System.out.println(
Arrays.asList("J", "A", "V", "A")
.stream()
.findAny().get()); // w
System.out.println(
Arrays.asList("J", "A", "V", "A")
.parallelStream()
.findAny().get()); // não é predicável
Na Listagem 6 o resultado do primeiro “println” é “A”, pois o processamento da Stream é serial. Já no segundo “println” não podemos garantir o mesmo, pois a Stream é processada de forma paralela.
Uma operação de redução recebe uma sequência de elementos de entrada e combina-os em um único resultado aplicando repetitivamente uma operação de combinação, tais como encontrar a soma ou máximo de um conjunto de números, ou elementos que se acumulam em uma lista. A API de Stream disponibilizada dois métodos: reduce e collect.
Transforma uma Stream em um único objeto, como por exemplo, uma Stream de String em uma única String com todos os elementos concatenados. Normalmente usamos o método reduce apenas com dois parâmetros, porém para reduções paralelas precisamos de mais um parâmetro, o combinador.
<U> U reduce(U identidade, BiFunction<U, ? super T,U> acumulador, BinaryOperator<U> combinador)
Para entendermos melhor, vejamos abaixo o papel de cada parâmetro:
System.out.println(
Arrays.asList("J", "A", "V", "A")
.parallelStream()
.reduce("", (s1, s2) -> s1 + s2, (s3, s4) -> s3 + s4));
Aplicando este processo no exemplo acima, foram formadas as Strings parciais “JA” e “VA” e então combinadas, resultando em “JAVA”.
Mais formalmente, os parâmetros do método reduce devem seguir algumas regras para obtermos o resultado desejado de uma Stream paralela, que são:
Referências: https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html
Postado por Diego Soares Rodrigues é um experiente profissional de TI relacionadas com os serviços financeiros, automação bancária e ATM. Experiência em C / C ++, Java, Linux, Shell Script, Visual Basic 6, Oracle PL / SQL e PostgreSQL PL / pgSQL. Sólidos conhecimentos de sistemas de integração usando Oracle Advanced Queuing (OAQ), Web Services e mensagens de soquete. Certificações Oracle Certified Associate, Java SE 8 Programador e Oracle Database SQL Certified Expert.
Este artigo foi revisto pela equipe de produtos Oracle e está em conformidade com as normas e práticas para o uso de produtos Oracle.